package com.zhao.apitest.transform;

import com.zhao.apitest.beans.SensorReading;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author ZhaoPan
 * @date 2022/3/23
 * @describe flink中的富函数 richFunction
 */
public class TransformTest5_RichFunction {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(4);

        //从文件读取数据
        DataStream<String> inputStream = env.readTextFile("/Volumes/Update/flink/flink_test/src/main/resources/sensor.txt");

        // 转换成SensorReading类型
        DataStream<SensorReading> dataStream=inputStream.map(new MapFunction<String, SensorReading>() {
            @Override
            public SensorReading map(String s) throws Exception {
                String[] fields=s.split(",");
                return new SensorReading(fields[0],new Long(fields[1]),new Double(fields[2]));
            }
        });

        DataStream<Tuple2<String,Integer>> resultStream=dataStream.map(new MyMapper());
        resultStream.print();

        env.execute();
    }

    public static class MyMapper0 implements MapFunction<SensorReading,Tuple2<String,Integer>>{
        @Override
        public Tuple2<String, Integer> map(SensorReading value) throws Exception {
            return new Tuple2<>(value.getId(),value.getId().length());
        }
    }

    // 继承富函数
    public static class MyMapper extends RichMapFunction<SensorReading,Tuple2<String,Integer>>{
        @Override
        public Tuple2<String, Integer> map(SensorReading value) throws Exception {
            // getRuntimeContext().getState()
            return new Tuple2<String,Integer>(value.getId(),getRuntimeContext().getIndexOfThisSubtask());
        }

        @Override
        public void open(Configuration parameters) throws Exception {
            // 初始化工作，一般是定义状态，或者创建数据库链接
            System.out.println("open");
            // super.open(parameters);
        }

        @Override
        public void close() throws Exception {
            // 关闭链接，收尾状态
            System.out.println("close");
            // super.close();
        }
    }

}
